use corro_agent::api::public::api_v1_db_schema;
use criterion::{
    black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
use tripwire::Tripwire;

use axum::Extension;
use corro_tests::{clone_test_agent, launch_test_agent};
use corro_types::{
    api::Statement,
    base::{CrsqlDbVersion, CrsqlSeq},
    broadcast::{ChangeSource, ChangeV1, Changeset},
    change::{row_to_change, Change},
};
use hyper::StatusCode;

use corro_agent::{
    agent::process_multiple_changes,
    api::public::{api_v1_transactions, TimeoutParams},
};

/// Configuration for a benchark run
struct BenchConfig {
    /// Seed for the random number generator
    rng_seed: u64,
    /// Number of tables to create for the benchmark - each table will have the same schema
    number_of_bench_tables: usize,
    /// Number of initial rows in the benchmark table before even starting the benchmark
    initial_rows_per_table: usize,
    /// Number of transactions(changesets) processed at once
    /// Changesets are pregenerated and shuffled for the max batch size
    batch_sizes: Vec<usize>,
    /// Operations per transaction - how many operations per transaction(changeset) to generate
    /// One operation might generate multiple changes
    operations_per_tx: usize,
    /// Ratio of insert:updates:deletes ops (must sum to 100)
    operation_mix: (u8, u8, u8),
    /// Whether to use partial changesets (benchmarks a different code path)
    /// If this is false then process_multiple_changes will only be called with complete changesets
    all_partial: bool,
}

/// Setup test agent with realistic data
/// The initial data is inserted using a single CTE query for efficiency AND is deterministic
/// 2 agents generated by this routine will have the exact same data
async fn setup_test_agent_with_data(config: &BenchConfig) -> eyre::Result<corro_tests::TestAgent> {
    // The tripwire will be tripped when exiting this function
    let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple();
    let ta = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;

    for table_idx in 0..config.number_of_bench_tables {
        let table_name = format!("bench_test_{table_idx}");
        let (status, _) = api_v1_db_schema(
            Extension(ta.agent.clone()),
            axum::Json(vec![format!(
                "CREATE TABLE IF NOT EXISTS {table_name} (
                id INTEGER NOT NULL PRIMARY KEY,
                value TEXT,
                counter INTEGER,
                random INTEGER
            ) WITHOUT ROWID;
            "
            )]),
        )
        .await;
        assert_eq!(status, StatusCode::OK);

        // Insert initial data using a single CTE query for efficiency
        // Uses an simple LCG for deterministic results
        if config.initial_rows_per_table > 0 {
            let (status, _) = api_v1_transactions(
                Extension(ta.agent.clone()),
                axum::extract::Query(TimeoutParams { timeout: None }),
                axum::Json(vec![Statement::WithParams(
                    format!(
                        "WITH RECURSIVE cnt(x, seed) AS (
                        SELECT 1, {}
                        UNION ALL
                        SELECT x+1, (1664525 * seed + 1013904223) & 4294967295 FROM cnt WHERE x < {}
                    )
                    INSERT INTO {table_name} (id, value, counter, random)
                    SELECT x, 'initial_' || x, 0, seed FROM cnt",
                        config.rng_seed + table_idx as u64,
                        config.initial_rows_per_table
                    ),
                    vec![],
                )]),
            )
            .await;
            assert_eq!(status, StatusCode::OK);
        }
    }

    Ok(ta)
}

/// Generate realistic changesets from actual database operations
async fn generate_changesets(
    ta: &corro_tests::TestAgent,
    config: &BenchConfig,
) -> eyre::Result<Vec<(ChangeV1, ChangeSource, Instant)>> {
    let mut rng = StdRng::seed_from_u64(config.rng_seed);
    let mut changesets = Vec::new();

    let start_version =
        ta.agent
            .pool()
            .read()
            .await?
            .query_row("SELECT crsql_db_version()", [], |row| {
                row.get::<_, CrsqlDbVersion>(0)
            })?;

    // Generate transactions with mixed operations
    let max_transactions = *config.batch_sizes.iter().max().unwrap();
    for tx_idx in 0..max_transactions {
        let mut statements = Vec::new();
        for _ in 0..config.operations_per_tx {
            let table_idx = rng.random_range(0..config.number_of_bench_tables);
            let table_name = format!("bench_test_{table_idx}");

            let op_type = rng.random_range(0..100);
            let (insert_pct, update_pct, _delete_pct) = config.operation_mix;

            let stmt = if op_type < insert_pct {
                // Insert - use high IDs to avoid conflicts
                let id = config.initial_rows_per_table + tx_idx * 1000 + rng.random_range(0..1000);
                Statement::WithParams(
                    format!("INSERT OR REPLACE INTO {table_name} (id, value, counter, random) VALUES (?, ?, ?, ?)"),
                    vec![(id as i64).into(), format!("value_{id}").into(), rng.random_range(0..100000).into(), 0.into()],
                )
            } else if op_type < insert_pct + update_pct {
                // Update - use existing IDs
                let id = rng.random_range(1..=config.initial_rows_per_table.min(1000));
                Statement::WithParams(
                    format!("UPDATE {table_name} SET counter = counter + 1, value = ?, random = ? WHERE id = ?"),
                    vec![format!("updated_{}", rng.random_range(0..1000)).into(), rng.random_range(0..100000).into(), (id as i64).into()],
                )
            } else {
                // Delete
                let id = rng.random_range(1..=config.initial_rows_per_table.min(1000));
                Statement::WithParams(
                    format!("DELETE FROM {table_name} WHERE id = ?"),
                    vec![(id as i64).into()],
                )
            };
            statements.push(stmt);
        }

        // Execute transaction
        let (status, _) = api_v1_transactions(
            Extension(ta.agent.clone()),
            axum::extract::Query(TimeoutParams { timeout: None }),
            axum::Json(statements),
        )
        .await;
        assert_eq!(status, StatusCode::OK);
    }

    // Fetch all changes since start_version
    let end_version =
        ta.agent
            .pool()
            .read()
            .await?
            .query_row("SELECT crsql_db_version()", [], |row| {
                row.get::<_, CrsqlDbVersion>(0)
            })?;

    let conn = ta.agent.pool().read().await?;
    for version in (start_version.0 + 1)..=end_version.0 {
        let version = CrsqlDbVersion(version);

        let changes: Vec<Change> = conn
            .prepare_cached(
                "SELECT \"table\", pk, cid, val, col_version, db_version, seq, site_id, cl 
                FROM crsql_changes WHERE db_version = ? 
                ORDER BY seq ASC",
            )?
            .query_map([version], row_to_change)?
            .collect::<Result<Vec<_>, _>>()?;

        if changes.is_empty() {
            continue;
        }

        let last_seq = changes.iter().map(|c| c.seq).max().unwrap_or(CrsqlSeq(0));
        let seqs = CrsqlSeq(0)..=last_seq;

        changesets.push((
            ChangeV1 {
                actor_id: ta.agent.actor_id(),
                changeset: Changeset::Full {
                    version,
                    changes,
                    seqs: seqs.into(),
                    last_seq,
                    ts: ta.agent.clock().new_timestamp().into(),
                },
            },
            ChangeSource::Broadcast,
            Instant::now(),
        ));
    }

    // Convert to partial changesets if requested
    if config.all_partial {
        changesets = make_changesets_partial(changesets);
    }

    // Shuffle adjacent changesets to simulate realistic arrival order
    // Essentially shuffle tx_id X at most 10 places from its original position
    use rand::seq::SliceRandom;
    for chunk in changesets.chunks_mut(10) {
        chunk.shuffle(&mut rng);
    }

    Ok(changesets)
}

/// Convert full changesets to partial (buffered) changesets
/// by adding 10 to the last_seq
fn make_changesets_partial(
    changesets: Vec<(ChangeV1, ChangeSource, Instant)>,
) -> Vec<(ChangeV1, ChangeSource, Instant)> {
    changesets
        .into_iter()
        .map(|(change, source, instant)| {
            let new_changeset = match change.changeset {
                Changeset::Full {
                    version,
                    changes,
                    seqs,
                    last_seq,
                    ts,
                } => Changeset::Full {
                    version,
                    changes,
                    seqs,
                    last_seq: last_seq + 10,
                    ts,
                },
                Changeset::FullV2 {
                    actor_id,
                    version,
                    changes,
                    last_seq,
                    seqs,
                    ts,
                } => Changeset::FullV2 {
                    actor_id,
                    version,
                    changes,
                    last_seq: last_seq + 10,
                    seqs,
                    ts,
                },
                other => other,
            };

            (
                ChangeV1 {
                    actor_id: change.actor_id,
                    changeset: new_changeset,
                },
                source,
                instant,
            )
        })
        .collect()
}

fn run_benchmark_with_config(c: &mut Criterion, bench_name: &str, config: &BenchConfig) {
    let rt = Runtime::new().unwrap();

    // Pre-generate changesets once and discard the node which generated them
    let changesets = rt.block_on(async {
        let source = setup_test_agent_with_data(config)
            .await
            .expect("Failed to setup source agent");
        generate_changesets(&source, config)
            .await
            .expect("Failed to generate changesets")
    });

    // Setup an agent which we will clone for each benchmark iteration
    // It should have the same data as the node used for generating changesets, but not the generated changesets
    let template_agent = rt.block_on(async {
        setup_test_agent_with_data(config)
            .await
            .expect("Failed to setup template agent")
    });

    // Sanity check if the benchmark data is correct
    // We will be inserting changes into a new node, so the actor_id should be different
    assert!(template_agent.agent.actor_id() != changesets[0].0.actor_id);
    // We actually have the initial data generated
    for table_idx in 0..config.number_of_bench_tables {
        let table_name = format!("bench_test_{table_idx}");
        let sanity_check_row_count: usize = rt.block_on(async {
            template_agent
                .client()
                .pool()
                .get()
                .await
                .expect("Failed to get connection")
                .query_row(&format!("SELECT COUNT(*) FROM {table_name}"), [], |row| {
                    row.get(0)
                })
                .unwrap()
        });
        assert!(sanity_check_row_count == config.initial_rows_per_table);
    }
    // We generated enough changesets for the largest batch size
    let max_transactions = *config.batch_sizes.iter().max().unwrap();
    assert!(changesets.len() >= max_transactions);
    // Each changeset should have at least the number of operations per transaction
    for (changeset, _, _) in &changesets {
        assert!(!changeset.is_empty());
        //assert!(changeset.len() >= config.operations_per_tx); // Sometimes a delete is a no-op
        if config.all_partial {
            assert!(!changeset.is_complete());
        } else {
            assert!(changeset.is_complete());
        }
    }

    let mut group = c.benchmark_group(bench_name);

    // Test different batch sizes
    for batch_size in config.batch_sizes.iter() {
        let batch_size = *batch_size;
        if batch_size > changesets.len() {
            panic!(
                "Not enough changesets for batch size {batch_size}. {} changesets are available",
                changesets.len()
            );
        }

        // See how many changes are in the batch so we can measure the throughput accurately
        let changes_in_batch: u64 = changesets[..batch_size]
            .iter()
            .map(|(changeset, _, _)| changeset.len() as u64)
            .sum();
        group.throughput(Throughput::Elements(changes_in_batch));
        group.bench_with_input(
            BenchmarkId::from_parameter(batch_size),
            &batch_size,
            |b, &size| {
                b.iter_batched(
                    // Setup is not measured
                    // Clones template agent to ensure identical DB state for each iteration
                    || {
                        let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();
                        let clone_agent = rt.block_on(async {
                            clone_test_agent(&template_agent, tripwire)
                                .await
                                .expect("Failed to clone agent")
                        });
                        // This should be a full clone
                        assert!(clone_agent.agent.actor_id() == template_agent.agent.actor_id());

                        let changes = changesets[..size].to_vec();
                        (clone_agent, changes, tripwire_worker, tripwire_tx)
                    },
                    |(clone_agent, changes, _tripwire_worker, _tripwire_tx)| {
                        // Measurement: Only this is timed
                        rt.block_on(async {
                            let result = process_multiple_changes(
                                clone_agent.agent.clone(),
                                clone_agent.bookie.clone(),
                                changes,
                                Duration::from_secs(30),
                            )
                            .await;

                            black_box(result)
                        })
                    },
                    BatchSize::PerIteration, // So we don't steal IOPS among runs
                );
            },
        );
    }

    group.finish();
}

impl Default for BenchConfig {
    fn default() -> Self {
        Self {
            rng_seed: 1337,
            number_of_bench_tables: 1,
            initial_rows_per_table: 1_000_000,
            batch_sizes: vec![1, 2, 5, 10, 25, 50, 100],
            operations_per_tx: 10,
            operation_mix: (40, 50, 10), // 40% insert, 50% update, 10% delete
            all_partial: false,
        }
    }
}

/// Benchmark: Test how quickly we can sync remote changesets
fn bench_fulls_one_large_table_mixed(c: &mut Criterion) {
    run_benchmark_with_config(
        c,
        "bench_fulls_one_large_table_mixed",
        &BenchConfig {
            number_of_bench_tables: 1,
            initial_rows_per_table: 1_000_000,
            ..BenchConfig::default()
        },
    );
}

fn bench_fulls_four_large_tables_mixed(c: &mut Criterion) {
    run_benchmark_with_config(
        c,
        "bench_fulls_four_large_tables_mixed",
        &BenchConfig {
            number_of_bench_tables: 4,
            initial_rows_per_table: 1_000_000,
            ..BenchConfig::default()
        },
    );
}

fn bench_fulls_one_large_table_insert_only(c: &mut Criterion) {
    run_benchmark_with_config(
        c,
        "bench_fulls_one_large_table_insert_only",
        &BenchConfig {
            number_of_bench_tables: 1,
            initial_rows_per_table: 1_000_000,
            operation_mix: (100, 0, 0),
            ..BenchConfig::default()
        },
    );
}

fn bench_fulls_one_large_table_update_only(c: &mut Criterion) {
    run_benchmark_with_config(
        c,
        "bench_fulls_one_large_table_update_only",
        &BenchConfig {
            number_of_bench_tables: 1,
            initial_rows_per_table: 1_000_000,
            operation_mix: (0, 100, 0),
            ..BenchConfig::default()
        },
    );
}

fn bench_fulls_one_large_table_delete_only(c: &mut Criterion) {
    run_benchmark_with_config(
        c,
        "bench_fulls_one_large_table_delete_only",
        &BenchConfig {
            number_of_bench_tables: 1,
            initial_rows_per_table: 1_000_000,
            operation_mix: (0, 0, 100),
            ..BenchConfig::default()
        },
    );
}

// Partial perf won't depend on the number of tables or their size
fn bench_partials(c: &mut Criterion) {
    run_benchmark_with_config(
        c,
        "bench_partials",
        &BenchConfig {
            all_partial: true,
            number_of_bench_tables: 1,
            initial_rows_per_table: 100_000,
            ..BenchConfig::default()
        },
    );
}

criterion_group! {
    name = benches;
    config = Criterion::default()
        .measurement_time(Duration::from_secs(15))
        .warm_up_time(Duration::from_secs(1))
        .sample_size(10);
    targets =
        bench_fulls_one_large_table_mixed,
        bench_fulls_four_large_tables_mixed,
        bench_fulls_one_large_table_insert_only,
        bench_fulls_one_large_table_update_only,
        bench_fulls_one_large_table_delete_only,
        bench_partials
}

criterion_main!(benches);
